package com.opencee.cloud.msg.mq;

import com.alibaba.fastjson.JSONObject;
import com.opencee.cloud.autoconfigure.utils.RestTemplateUtil;
import com.opencee.cloud.msg.api.constatns.HttpNotifyState;
import com.opencee.cloud.msg.api.constatns.MsgConstants;
import com.opencee.cloud.msg.api.entity.MsgHttpNotifyRecordsEntity;
import com.opencee.cloud.msg.api.vo.DelayedMessage;
import com.opencee.cloud.msg.api.vo.params.EmailTextParams;
import com.opencee.cloud.msg.api.vo.params.HttpNotifyParams;
import com.opencee.cloud.msg.api.vo.params.MessageTaskParams;
import com.opencee.cloud.msg.service.MsgHttpNotifyRecordsService;
import com.opencee.cloud.msg.thread.MessageTaskDispatcher;
import com.opencee.cloud.msg.utils.MqUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;

import java.util.*;

/**
 * Http异步通知消息监听
 * 支持发送失败阶梯重试
 *
 * @author liuyadu
 */
@Slf4j
@Configuration
public class DelayedHttpNotifyListener {
    /**
     * 返回结果
     */
    private static final String SUCCESS = "success";

    @Autowired
    private RestTemplateUtil restTemplateUtil;
    @Autowired
    private MsgHttpNotifyRecordsService httpNotifyRecordsService;
    @Autowired
    private MessageTaskDispatcher dispatcher;
    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * 租户请求头
     */
    private static final String HTTP_HEADER_TENANT_ID = "X-Tenant-Id";

    private static final String ZERO = "0";

    /**
     * 首次是即时推送，重试通知时间间隔为 5s、10s、2min、5min、10min、30min、1h、2h、6h、15h，直到你正确回复状态 200 并且返回 success 或者超过最大重发次数
     */
    public final static List<Integer> RETRY_DELAYED_TIMES = Arrays.asList(new Integer[]{
            5 * 1000,
            10 * 1000,
            2 * 60 * 1000,
            5 * 60 * 1000,
            10 * 60 * 1000,
            30 * 60 * 1000,
            60 * 60 * 1000,
            2 * 60 * 60 * 1000,
            6 * 60 * 60 * 1000,
            15 * 60 * 60 * 1000
    });

    @RabbitListener(queues = MsgConstants.MSG_DELAYED_QUEUE_HTTP_NOTIFY)
    public void onMessage(Message message) {
        try {
            String msgId = message.getMessageProperties().getMessageId();
            String receivedMsg = new String(message.getBody(), "UTF-8");
            log.debug("http通知消息接收:{}", message);
            JSONObject delayedMessage = JSONObject.parseObject(receivedMsg);
            // 处理 http通知消息
            HttpNotifyParams httpNotifyMessage = delayedMessage.getObject("body", HttpNotifyParams.class);
            send(msgId, message, httpNotifyMessage);
        } catch (Exception e) {
            log.error("http通知消息接收错误:", e);
        }
    }


    /**
     * 消息通知
     */
    protected void send(String msgId, Message message, HttpNotifyParams notify) throws Exception {
        MsgHttpNotifyRecordsEntity record = httpNotifyRecordsService.getByMsgId(msgId);
        if (record == null) {
            return;
        }
        if (!HttpNotifyState.NORMAL.getValue().equals(record.getStatus())) {
            return;
        }
        Map<String, Object> headers = message.getMessageProperties().getHeaders();
        // 默认延迟时间
        String originalExpiration = "";
        if (headers != null) {
            Object times = headers.get("delay-times");
            if (times != null && !ZERO.equals(times)) {
                originalExpiration = times.toString();
            }
        }
        String httpResult = "";
        try {
            Map<String, String> httpHeaders = new HashMap<>(8);
            // 增加租户请求头
            httpHeaders.put(HTTP_HEADER_TENANT_ID, record.getTenantId());
            Map<String, String> params = new HashMap<>(8);
            if (notify.getContent() != null) {
                Set<String> keys = notify.getContent().keySet();
                for (String key : keys) {
                    Object value = notify.getContent().get(key);
                    params.put(key, value != null ? String.valueOf(value) : "");
                }
            }
            httpResult = restTemplateUtil.post(notify.getUrl(), httpHeaders, params, String.class);
        } catch (Exception e) {
            log.error("http error:{}", e);
        }
        if (SUCCESS.equalsIgnoreCase(httpResult)) {
            log.debug("result success {}", notify.getUrl());
            // 通知失败重试成功
            MsgHttpNotifyRecordsEntity m = new MsgHttpNotifyRecordsEntity();
            m.setId(record.getId());
            m.setResult(1);
            modifyLog(m);
        } else {
            // 通知失败重试
            log.debug("result fail do retry {}", notify.getUrl());
            Integer next = retry(record, notify, originalExpiration);
            if (next != null) {
                // 更新日志
                MsgHttpNotifyRecordsEntity m = new MsgHttpNotifyRecordsEntity();
                m.setId(record.getId());
                m.setTotalNums(record.getTotalNums() + 1);
                m.setRetryNums(record.getRetryNums() + 1);
                m.setDelay(Long.valueOf(next));
                modifyLog(m);
            } else {
                // 最后一次,仍失败发送邮件通知
                if (!StringUtils.isEmpty(notify.getAlertEmail())) {
                    log.debug("send email alert {}", notify.getUrl());
                    String subject = String.format("异步通知【%s】失败告警", notify.getTitle());
                    String content = String.format("标题：%s 类型：%s 通知地址：%s 重试次数：%s ID: %s MsgId: %s 业务编号：%s", notify.getTitle(), record.getType(), notify.getUrl(), record.getRetryNums(), record.getId(), record.getMsgId(), notify.getKey());
                    MessageTaskParams taskParams = new MessageTaskParams();
                    taskParams.setAppId("0");
                    taskParams.setChannelKey("");
                    // @todo
                    EmailTextParams email = new EmailTextParams();
                    email.setTo(new HashSet(Arrays.asList(StringUtils.delimitedListToStringArray(notify.getAlertEmail(), ";"))));
                    email.setSubject(subject);
                    email.setContent(content);
                    taskParams.setParams(email);
                    this.dispatcher.dispatch(taskParams);
                }
            }
        }
    }

    /**
     * 查询下一个通知时间
     * 返回Null表示最终通知失败
     *
     * @param originalExpiration
     * @return
     */
    protected Integer getNext(String originalExpiration) {
        if (StringUtils.isEmpty(originalExpiration)) {
            return RETRY_DELAYED_TIMES.get(0);
        }
        int index = RETRY_DELAYED_TIMES.indexOf(Integer.parseInt(originalExpiration));
        if (index >= RETRY_DELAYED_TIMES.size() - 1) {
            return null;
        }
        int nextInterval = RETRY_DELAYED_TIMES.get(index + 1);
        return nextInterval;
    }

    /**
     * 通知失败重试
     *
     * @param originalExpiration
     */
    protected Integer retry(MsgHttpNotifyRecordsEntity record, HttpNotifyParams notification, String originalExpiration) throws Exception {
        Integer next = getNext(originalExpiration);
        if (next != null) {
            // 下次延迟时间
            log.debug("current ={} next ={}", originalExpiration, next);
            DelayedMessage delayedMessage = new DelayedMessage(record.getTenantId(), MsgConstants.MSG_DELAYED_QUEUE_RK_HTTP_NOTIFY, record.getMsgId(), next, notification);
            MqUtil.delayed(amqpTemplate, delayedMessage);
            return next;
        } else {
            // 最后一次
            log.debug("finish = {}", originalExpiration);
            return null;
        }
    }

    /**
     * 更新日志
     *
     * @param record
     */
    protected void modifyLog(MsgHttpNotifyRecordsEntity record) {
        if (record == null) {
            return;
        }
        record.setUpdateTime(new Date());
        httpNotifyRecordsService.updateById(record);
    }
}
